Add automatic execution planning for Dask workflows#9
Add automatic execution planning for Dask workflows#9AdvancedImagingUTSW wants to merge 2 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds an automatic execution-planning layer to ClearEx’s Dask workflows, introducing operator-facing policy/config models and persisting both requested policy and the derived effective execution plan across CLI/GUI/runtime/provenance.
Changes:
- Introduces
ExecutionPolicy, environment capability detection, calibration profiles, andExecutionPlanmodels, plusplan_execution()inworkflow.py. - Wires execution policy/planning through CLI args, runtime backend startup, GUI dialogs, and provenance persistence.
- Updates tests to ensure legacy “advanced backend override” behavior remains supported.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/clearex/workflow.py |
Adds execution planning models, serialization helpers, environment detection, calibration profile generation, and the plan_execution() planner. |
src/clearex/main.py |
Loads execution policy, runs planning during backend configuration, persists calibration profiles, and records policy/plan in runtime metadata. |
src/clearex/gui/app.py |
Adds an “Execution Planning” GUI dialog and integrates planning summaries, persistence, and calibration profile caching. |
src/clearex/io/cli.py |
Adds CLI flags for execution planning mode and sizing hints. |
src/clearex/io/provenance.py |
Persists execution policy and effective plan (both structured + summary) in run provenance. |
tests/test_main.py |
Updates tests to explicitly set advanced execution mode where backend overrides are expected. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| plan = plan_execution( | ||
| workflow, | ||
| workload=self._workload, | ||
| shape_tpczyx=self._recommendation_shape_tpczyx, | ||
| chunks_tpczyx=self._recommendation_chunks_tpczyx, | ||
| dtype_itemsize=self._recommendation_dtype_itemsize, | ||
| calibration_profiles=profiles, | ||
| ) |
There was a problem hiding this comment.
ExecutionPolicyDialog._refresh_summary calls plan_execution on every textChanged/currentIndexChanged event. plan_execution performs environment detection (including GPU detection via subprocess nvidia-smi with a timeout), which can block the UI thread and make the dialog feel laggy. Consider caching detected capabilities within the dialog (or adding a way to pass precomputed capabilities into plan_execution) and/or debouncing refreshes.
| workflow = WorkflowConfig( | ||
| file=self._base_config.file, | ||
| analysis_targets=self._analysis_targets, | ||
| analysis_selected_experiment_path=( | ||
| self._base_config.analysis_selected_experiment_path | ||
| ), | ||
| analysis_apply_to_all=bool( | ||
| self._analysis_apply_to_all_checkbox.isChecked() | ||
| if self._analysis_apply_to_all_checkbox is not None | ||
| else False | ||
| ), | ||
| prefer_dask=self._base_config.prefer_dask, | ||
| execution_policy=self._execution_policy, | ||
| dask_backend=self._dask_backend_config, | ||
| chunks=self._base_config.chunks, | ||
| flatfield=self._operation_checkboxes["flatfield"].isChecked(), | ||
| deconvolution=self._operation_checkboxes["deconvolution"].isChecked(), | ||
| shear_transform=self._operation_checkboxes["shear_transform"].isChecked(), | ||
| particle_detection=self._operation_checkboxes["particle_detection"].isChecked(), | ||
| usegment3d=self._operation_checkboxes["usegment3d"].isChecked(), | ||
| registration=self._operation_checkboxes["registration"].isChecked(), | ||
| visualization=self._operation_checkboxes["visualization"].isChecked(), | ||
| mip_export=self._operation_checkboxes["mip_export"].isChecked(), | ||
| zarr_save=self._base_config.zarr_save, | ||
| analysis_parameters=normalize_analysis_operation_parameters( | ||
| self._base_config.analysis_parameters | ||
| ), | ||
| ) | ||
| plan = plan_execution( | ||
| workflow, | ||
| workload="analysis", | ||
| shape_tpczyx=self._analysis_store_shape_tpczyx(), | ||
| chunks_tpczyx=self._base_config.zarr_save.chunks_tpczyx(), | ||
| dtype_itemsize=self._analysis_store_dtype_itemsize(), | ||
| calibration_profiles=_load_execution_calibration_profiles(), | ||
| ) | ||
| text = ( | ||
| f"Policy: {format_execution_policy_summary(self._execution_policy)}\n" | ||
| f"Plan: {format_execution_plan_summary(plan)}" | ||
| ) |
There was a problem hiding this comment.
_refresh_dask_backend_summary docstring says errors are handled internally, but the new code calls WorkflowConfig(...) and plan_execution(...) without a try/except. If persisted execution policy/backend settings or environment detection triggers a ValueError, this will raise on the GUI thread. Wrap plan derivation in a try/except and fall back to a safe summary message (similar to other planning summaries in this file).
| workflow = WorkflowConfig( | |
| file=self._base_config.file, | |
| analysis_targets=self._analysis_targets, | |
| analysis_selected_experiment_path=( | |
| self._base_config.analysis_selected_experiment_path | |
| ), | |
| analysis_apply_to_all=bool( | |
| self._analysis_apply_to_all_checkbox.isChecked() | |
| if self._analysis_apply_to_all_checkbox is not None | |
| else False | |
| ), | |
| prefer_dask=self._base_config.prefer_dask, | |
| execution_policy=self._execution_policy, | |
| dask_backend=self._dask_backend_config, | |
| chunks=self._base_config.chunks, | |
| flatfield=self._operation_checkboxes["flatfield"].isChecked(), | |
| deconvolution=self._operation_checkboxes["deconvolution"].isChecked(), | |
| shear_transform=self._operation_checkboxes["shear_transform"].isChecked(), | |
| particle_detection=self._operation_checkboxes["particle_detection"].isChecked(), | |
| usegment3d=self._operation_checkboxes["usegment3d"].isChecked(), | |
| registration=self._operation_checkboxes["registration"].isChecked(), | |
| visualization=self._operation_checkboxes["visualization"].isChecked(), | |
| mip_export=self._operation_checkboxes["mip_export"].isChecked(), | |
| zarr_save=self._base_config.zarr_save, | |
| analysis_parameters=normalize_analysis_operation_parameters( | |
| self._base_config.analysis_parameters | |
| ), | |
| ) | |
| plan = plan_execution( | |
| workflow, | |
| workload="analysis", | |
| shape_tpczyx=self._analysis_store_shape_tpczyx(), | |
| chunks_tpczyx=self._base_config.zarr_save.chunks_tpczyx(), | |
| dtype_itemsize=self._analysis_store_dtype_itemsize(), | |
| calibration_profiles=_load_execution_calibration_profiles(), | |
| ) | |
| text = ( | |
| f"Policy: {format_execution_policy_summary(self._execution_policy)}\n" | |
| f"Plan: {format_execution_plan_summary(plan)}" | |
| ) | |
| try: | |
| workflow = WorkflowConfig( | |
| file=self._base_config.file, | |
| analysis_targets=self._analysis_targets, | |
| analysis_selected_experiment_path=( | |
| self._base_config.analysis_selected_experiment_path | |
| ), | |
| analysis_apply_to_all=bool( | |
| self._analysis_apply_to_all_checkbox.isChecked() | |
| if self._analysis_apply_to_all_checkbox is not None | |
| else False | |
| ), | |
| prefer_dask=self._base_config.prefer_dask, | |
| execution_policy=self._execution_policy, | |
| dask_backend=self._dask_backend_config, | |
| chunks=self._base_config.chunks, | |
| flatfield=self._operation_checkboxes["flatfield"].isChecked(), | |
| deconvolution=self._operation_checkboxes["deconvolution"].isChecked(), | |
| shear_transform=self._operation_checkboxes["shear_transform"].isChecked(), | |
| particle_detection=self._operation_checkboxes["particle_detection"].isChecked(), | |
| usegment3d=self._operation_checkboxes["usegment3d"].isChecked(), | |
| registration=self._operation_checkboxes["registration"].isChecked(), | |
| visualization=self._operation_checkboxes["visualization"].isChecked(), | |
| mip_export=self._operation_checkboxes["mip_export"].isChecked(), | |
| zarr_save=self._base_config.zarr_save, | |
| analysis_parameters=normalize_analysis_operation_parameters( | |
| self._base_config.analysis_parameters | |
| ), | |
| ) | |
| plan = plan_execution( | |
| workflow, | |
| workload="analysis", | |
| shape_tpczyx=self._analysis_store_shape_tpczyx(), | |
| chunks_tpczyx=self._base_config.zarr_save.chunks_tpczyx(), | |
| dtype_itemsize=self._analysis_store_dtype_itemsize(), | |
| calibration_profiles=_load_execution_calibration_profiles(), | |
| ) | |
| text = ( | |
| f"Policy: {format_execution_policy_summary(self._execution_policy)}\n" | |
| f"Plan: {format_execution_plan_summary(plan)}" | |
| ) | |
| except Exception: | |
| logging.getLogger(__name__).exception( | |
| "Failed to derive Dask execution plan for backend summary." | |
| ) | |
| text = ( | |
| f"Policy: {format_execution_policy_summary(self._execution_policy)}\n" | |
| "Plan: Unable to derive execution plan; see logs for details." | |
| ) |
| object.__setattr__( | ||
| self, | ||
| "cpu_utilization", | ||
| max(0.05, float(self.cpu_utilization)), |
There was a problem hiding this comment.
CalibrationProfile.post_init clamps cpu_utilization to a minimum but does not cap it at 1.0 (unlike confidence). This allows persisted/loaded profiles to contain cpu_utilization > 1.0, which can leak into summaries/logic that assume a fraction. Consider clamping into [0.0, 1.0] here for consistency and robustness when reading cached profiles.
| max(0.05, float(self.cpu_utilization)), | |
| max(0.05, min(1.0, float(self.cpu_utilization))), |
| f"scheduler={capabilities.scheduler_mode}", | ||
| ] | ||
| if capabilities.attached_scheduler_file: | ||
| parts.append(f"scheduler_file={capabilities.attached_scheduler_file}") |
There was a problem hiding this comment.
_environment_fingerprint includes the full attached_scheduler_file path. Scheduler-file paths are often ephemeral (e.g., per-job tmp paths), which will churn calibration profile keys and prevent cache reuse; it also bakes absolute paths into persisted fingerprints. Consider omitting the path from the fingerprint (or replacing it with a stable boolean/marker like attached_scheduler=yes) so cached profiles remain reusable across runs.
| parts.append(f"scheduler_file={capabilities.attached_scheduler_file}") | |
| parts.append("attached_scheduler=yes") |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #9 +/- ##
==========================================
+ Coverage 48.29% 48.48% +0.19%
==========================================
Files 49 49
Lines 15539 16243 +704
==========================================
+ Hits 7505 7876 +371
- Misses 8034 8367 +333
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
Testing
uv run ruff check src/clearex/workflow.py src/clearex/main.py src/clearex/gui/app.py src/clearex/io/cli.py src/clearex/io/provenance.py tests/test_main.pyuv run pytest -q tests/test_workflow.py tests/io/test_cli.py tests/io/test_provenance.py tests/test_main.py tests/gui/test_gui_execution.pyuv run basedpyright src/clearex/workflow.py src/clearex/main.py src/clearex/gui/app.py src/clearex/io/cli.py src/clearex/io/provenance.pyreported an existing noisy baseline and was not used as a gate